<?php

namespace app\service;

use think\facade\Config;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class Mq {
    /**
     * 生产者
     * @param $data array 发送的内容
     * @param $queue string 队列名 假如是路由模式和主题模式，含义就是路由键
     * @param string $exchange 交换机名
     * @param bool $exchangeType 交换机类型
     * @param bool $isDlx 是否死信队列
     * @param array $dlx 死信队列配置内容
     * @throws \Exception
     */
    public function producer($data, $queue, $exchange = '', $exchangeType = false, $isDlx = false, $dlx = []) {
        try {
            $param = Config::get('common.mq');
            $connection = new AMQPStreamConnection(
                $param['host'],
                $param['port'],
                $param['login'],
                $param['password'],
                $param['vhost']
            );
            //将要发送的数据变成json字符串
            $messageBody = json_encode($data);
            //连接信道
            $channel = $connection->channel();
            //开始事务
            //$channel->tx_select();
            //确认机制
            // $channel->confirm_select();
            // $channel->set_ack_handler(function (AMQPMessage $message){
            //     echo 'ack-' . $message->getBody() . PHP_EOL;
            // });
            // $channel->set_nack_handler(function (AMQPMessage $message){
            //     echo 'nack-' . $message->getBody() . PHP_EOL;
            // });

            if ($exchangeType) {
                /* 声明交换机
                 * 参数1:交换机名
                 * 参数2:交换机类型
                 * 参数3:检查是否存在
                 * 参数4:是否持久化
                 * 参数5:是否自动删除
                 */
                $channel->exchange_declare($exchange, $exchangeType, false, true, false);
            } else {
                $arguments = [];
                if ($isDlx) {
                    $arguments = new AMQPTable([
                        'x-dead-letter-exchange' => $dlx['exchange'],//死信交换机名
                        'x-message-ttl' => $dlx['ttl'], //消息的存活时间
                        'x-dead-letter-routing-key' => $dlx['routingKey']
                    ]);
                }

                /*
                 * 声明队列,前5个参数描述如下:
                 * 参数1:队列名
                 * 参数2:检测队列是否存在，true只检测不创建,false 会创建
                 * 参数3:是否持久化队列 true持久化
                 * 参数4:私有队列 true代表私有
                 * 参数5:当消费者断开连接后，队列是否自动删除
                 */
                $channel->queue_declare($queue, false, true, false, false, false, $arguments);
            }

            //创建amqp消息类型
            $properties = [
                'content_type' => 'text/plain',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, //持久化
            ];
            $message = new AMQPMessage($messageBody, $properties);

            /*
             * 发送消息
             * 参数1:消息内容
             * 参数2:交换机名称
             * 参数3:队列名称
             */
            $channel->basic_publish($message, $exchange, $queue);
            //$channel->wait_for_pending_acks_returns(5);
            //提交事务
            //$channel->tx_commit();
            $channel->close(); //关闭信道
            $connection->close(); //关闭连接
        } catch (\Exception $e) {
            //$channel->tx_rollback();
            $channel->close();
            $connection->close();
            echo $e->getMessage();
        }
    }

    /**
     * 消费者
     * @param $callback 回调函数
     */
    public function consumer($callback, $queue, $exchange = '', $exchangeType = false, $routingKey = '') {
        try {
            $param = Config::get('common.mq');
            $connection = new AMQPStreamConnection(
                $param['host'],
                $param['port'],
                $param['login'],
                $param['password'],
                $param['vhost']
            );
            //连接信道
            $channel = $connection->channel();
            //开始事务
            $channel->tx_select();
            //声明队列
            $channel->queue_declare($queue, false, true, false, false);
            if ($exchangeType) {
                //声明交换机
                $channel->exchange_declare($exchange, $exchangeType, false, true, false);
                //绑定交换机和队列
                $channel->queue_bind($queue, $exchange, $routingKey);
            }

            /* 消费消息
             * 参数1:队列名
             * 参数2:消费者标签
             * 参数3:AMQP标准
             * 参数4:是否自动应答，ack true自动应答,false手动应答
             * 参数5:是否排他
             * 参数7:回调函数
             */
            $channel->basic_consume($queue, '', false, false, false, false, $callback);
            $channel->tx_commit();

            //阻塞队列监听事件
            while (count($channel->callbacks)) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        } catch (\Exception $e) {
            //$channel->tx_rollback();
            $channel->close();
            $connection->close();
            echo $e->getMessage();
        }
    }
}